
// This file is part of Module Proxy.

// Module Proxy is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Module Proxy is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Module Proxy.  If not, see <https://www.gnu.org/licenses/>.


//         Copyright (C) 2021 - 2030  关中麦客  
//         All rights reserved
//
//         process.rs
//         socket处理、任务执行
//
//         created by 关中麦客 1036038462@qq.com

use bytes::{Bytes, BufMut, BytesMut};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{Duration, timeout, sleep};
use serde_json::{Value};

use tokio::process::{Command, Child};
// use encoding::all::GBK;
// use encoding::{Encoding, DecoderTrap};

// use log;
use super::cache;

const RSP_OK: &str = "{\"status\":\"ok\"}";
const RSP_ERROR: &str = "{\"status\":\"error\"}";

const TIMEOUT: u32 = 30; //30秒失效

/// 处理socket请求
pub async fn do_socket(mut socket: TcpStream)
{
    //读取12字节长度bytes
    let mut len_buf: [u8; 12] = [0; 12]; 
    match socket.read(&mut len_buf).await
    {
        Ok(c_size) =>
        {
            if c_size != 12   
            {
                log::warn!("socket read len_buf error: len_buf size not 12");
                return;
            }
        },
        Err(err) =>
        {
            log::warn!("socket read length error: {}", err);
            return;
        }
    }

    //解析消息长度
    let _msg_len: usize;
    match String::from_utf8(len_buf.to_vec())
    {
        Ok(s) =>
        {
            let len_str = s.trim();
            match len_str.parse::<usize>()
            {
                Ok(len) =>
                    _msg_len = len,
                Err(err) =>
                {
                    log::warn!("socket read length error: {}", err);
                    return;
                }
            }
        },
        Err(err) =>
        {
            log::warn!("socket read length error: {}", err);
            return;
        }
    }
    
    //读取消息
    let msg: &str;
    let mut buf: [u8; 4096] = [0; 4096];
    match socket.read(&mut buf).await
    {
        Ok(c_size) =>
        {
            match std::str::from_utf8(&buf[..c_size])
            {
                Ok(s) =>
                    msg = s,
                Err(err) =>
                {
                    log::warn!("socket read msg v to str error: {}", err);
                    return;
                }
            }
        },
        Err(err) =>
        {
            log::warn!("socket read msg error: {}", err);
            return;
        }
    };

    //解析json
    // log::info!("req: {}", msg);
    let json = parse_json(msg);
    match json["method"].as_str()
    {
        Some("start") =>
        {
            // json示例： {"method":"start","code":"1","rtsp":"rtsp://admin:Pass1234@2.36.207.50:554"}
            if cmd_start(json).await                    //执行shell命令行
            {
                socket_write(socket, RSP_OK).await;       //socket 返回
            }
            else
            {
                socket_write(socket, RSP_ERROR).await;    //socket 返回
            }
        },
        Some("keepalive") =>
        {
            // json示例： {"method":"keepalive","code":"1"}
            match keepalive(json).await
            {
                true => socket_write(socket, RSP_OK).await,
                false=> socket_write(socket, RSP_ERROR).await,
            }
        },
        Some(_) =>
        {
            log::warn!("error: UnKnown method");
            socket_write(socket, RSP_ERROR).await;   //socket 返回
        },
        None =>
        {
            log::warn!("error: UnKnown req");
            socket_write(socket, RSP_ERROR).await;   //socket 返回
        },
    }
}

// /// 检测超时任务线程
// pub async fn check_timeout()
// {
//     loop
//     {
//         sleep(Duration::from_secs(5)).await;    //间隔5秒
//         cache::check().await;         //查询超时的任务
//         // for code in vec
//         // {
//         //     //组装查询字符串， 2\\video.m3u8  or  2/video.m3u8
//         //     let param: String;
//         //     if cfg!(target_os = "windows")
//         //     {
//         //         param = code.clone() + "\\video.m3u8";
//         //     }
//         //     else
//         //     {
//         //         param = code.clone() + "/video.m3u8";
//         //     }

//         //     log::info!("检测进程：{}", &param);

//         //     //kill 进程
//         //     if let Some(pid) = pid(&param) //获得进程id
//         //     {
//         //         log::info!("获得进程id: {}", pid);
//         //         let b = kill(&pid); 
//         //         log::info!("kill 进程返回: {}", b);    
//         //     }
//         //     else
//         //     {
//         //         log::info!("未获取到检测：{}", &param);
//         //     }

//         //     log::info!("删除缓存: {}", code);

//         //     //删除缓存中的任务
//         //     cache::del(code).await;           
//         // }
//     }
// }

///执行ffmpeg命令行
/// 成功创建m3u8文件返回true 
async fn cmd_start(json: Value) -> bool
{
    if let Some(code) = json["code"].as_str()
    {
        if let Some(rtsp) = json["rtsp"].as_str()
        {
            let cmd_conf = super::conf::params();
            let cmd_str = cmd_conf.replace("{1}", rtsp);
            let cmd_str = cmd_str.replace("{2}", &super::file::outfile(code));

            if let Some(_) = cache::get(&code).await
            {
                return true;    //任务已经存在，且未失效
            }

            //创建任务输出目录（如果不存在）
            super::file::create_task_dir(code);  
            //如果video.m3u8文件已存在且超时，删除之
            super::file::del_timeout_file(code, 10);  //10秒未更新超时

            log::info!("ffmpeg {}", cmd_str);                   //cmd 命令行
            let args: Vec<&str> = cmd_str.split(" ").collect(); //分割args
            // 执行ffmpeg子进程，kill_on_drop(true)在child消亡时自动kill并回收子进程
            if let Ok(child) = Command::new("ffmpeg").args(args).spawn()
            {
                check_task(code.to_string(), child).await;
            }
            else
            {
                log::warn!("ffmpeg command failed to start");
            }

            // log::info!("准备检测m3u8文件是否生成");

            //等待video.m3u8生成, 8秒超时
            let ret = timeout(Duration::from_secs(8), check_m3u8_ok(&super::file::outfile(code))).await; 
            // 未超时（video.m3u8生成）
            if !ret.is_err()
            {
                cache::set(code.to_string()).await;     //缓存添加或更新
                return true;                            //未超时（m3u8已创建）
            }  
        }
    }

    false
}

async fn check_task(code: String, mut child: Child)
{
    tokio::spawn(async move {
        loop
        {
            sleep(Duration::from_secs(5)).await;    //间隔5秒

            if let Some(timestamp) = cache::get(&code).await
            {
                if super::util::sec_timestamp() - timestamp > TIMEOUT  //失效
                {
                    break;
                }
            }
        }

        if let Ok(_) = child.kill().await           //kill子进程
        {
            // log::info!("kill OK");
            if let Ok(_) = child.wait().await       //回收子进程（避免僵尸）
            {
                // log::info!("wait OK");
            }
        }

        cache::del(code.to_string()).await; //删除cache中的任务
    });
}

/// 任务保活 true-成功 false-任务不存在或失效
async fn keepalive(json: Value) -> bool
{
    if let Some(code) = json["code"].as_str()
    {
        match cache::get(code).await 
        {
            Some(timestamp) =>
            {
                if super::util::sec_timestamp() - timestamp < TIMEOUT
                {
                    cache::set(code.to_string()).await;     //更新任务时间戳
                    return true;    //成功
                }
            },
            None =>
            {
                return false;   //任务不存在或失效
            },
        }
    }

    false   //非法json
}

/// socket 写
async fn socket_write(mut socket: TcpStream, msg: &str)
{
    let msg_bytes = msg.as_bytes();
    //发送长度行
    let len_msg = format!("{:>10}\r\n", msg_bytes.len());
    //组装发送消息
    let mut buf = BytesMut::new();
    buf.put(len_msg.as_bytes());
    buf.put(msg_bytes);
    let send_bytes = Bytes::from(buf);

    //发送
    if let Err(err) = socket.write_all(&send_bytes).await
    {
        log::warn!("socket write error: {}", err);
    }
}

/// 解析json
fn parse_json(msg: &str) -> Value
{
    match serde_json::from_str(msg)
    {
        Ok(json) => json,
        Err(err) =>
        {
            log::warn!("parse json error: {}", err);
            let json = serde_json::json!({"method":"error"});
            json
        }
    }
}

/// 检测m3u8文件是否创建
async fn check_m3u8_ok(m3u8_path: &str)
{
    loop
    {
        sleep(Duration::from_millis(200)).await;

        if super::file::file_exist(m3u8_path)
        {
            return;
        }
    }
}

// /// 根据进程参数（例如 1\video.m3u8 或 1/video.m3u8） 返回进程ID
// fn pid(params: &str) -> Option<String>
// {
//     if cfg!(target_os = "windows")
//     {
//         return win_pid(params);
//     }
//     else
//     {
//         return linux_pid(params);
//     }
// }

// fn win_pid(params: &str) -> Option<String>
// {
//     match Command::new("wmic")
//             .args(["process", "where", "caption='ffmpeg.exe'", "get", "processid,commandline", "/value"])
//             .output()
//     {
//         Ok(output) =>
//         {
//             if output.status.success()
//             {
//                 if let Ok(rsp) = String::from_utf8(output.stdout)
//                 {
//                     let rsp = rsp.trim();
//                     let v: Vec<&str> = rsp.split("CommandLine=").collect();  //信息分割
//                     for item in v
//                     {
//                         if item.contains(params)    //包含查询字符串
//                         {
//                             let v_id: Vec<&str> = item.split("=").collect();  //分割出PID
//                             let pid = v_id[v_id.len()-1].to_string();
//                             let pid = pid.trim();
//                             return Some(pid.to_string());
//                         }
//                     }
//                 }
//             }
//             else
//             {
//                 let decoded_string = GBK.decode(&output.stderr, DecoderTrap::Strict).unwrap();
//                 println!("status: {}", output.status);      //错误状态码
//                 println!("stderr: {}",  decoded_string);    //错误信息
//             }
//         },
//         Err(err) =>
//         {
//             println!("error: {}", err);     //执行command错误信息
//         },
//     };

//     None
// }

// fn linux_pid(params: &str) -> Option<String>
// {
//     match Command::new("ps")
//         .args(["-ef", "|", "grep'", params, "|", "grep", "-v", "grep"])
//         .output()
//     {
//         Ok(output) =>
//         {
//             if let Ok(rsp) = String::from_utf8(output.stdout)
//             {
//                 log::info!("获得ffmpeg进程id: {}", rsp);
//                 let v: Vec<&str> = rsp.split("CommandLine=").collect();
//                 log::info!("vec len: {}", v.len());
//                 if v.len() > 2
//                 {
//                     let pid = v[1].trim();
//                     log::info!("进程id: {}", pid);
//                     return Some(pid.to_string());
//                 }
//             }
//         },
//         Err(err) =>
//         {
//             log::warn!("ps error: {}", err);     //执行command错误信息
//         },
//     };

//     None
// }

// /// kill 进程
// fn kill(pid: &str) -> bool
// {
//     log::info!("kill {}", pid);
//     if cfg!(target_os = "windows")
//     {
//         return win_kill(pid);
//     }
//     else
//     {
//         return linux_kill(pid);
//     }
// }

// fn win_kill(pid: &str) -> bool
// {
//     match Command::new("taskkill")
//         .args(["/F", "/PID", pid])
//         .output()   
//     {
//         Ok(output) => 
//         {
//             if output.status.success()
//             {
//                 return true;
//             }
//             else
//             {
//                 log::warn!("Failed to taskkill the process:");
//                 let decoded_string = GBK.decode(&output.stderr, DecoderTrap::Strict).unwrap();
//                 log::warn!("status: {}", output.status);      //错误状态码
//                 log::warn!("stderr: {}",  decoded_string);    //错误信息
//             }
//         },
//         Err(err) =>
//         {
//             log::warn!("error: {}", err);     //执行command错误信息
//         },
//     }

//     false
// }

// fn linux_kill(pid: &str) -> bool
// {
//     Command::new("kill").arg(pid);   //kill pid
    
//     //查询进程是否还存在
//     match Command::new("ps")
//         .args(["-ef", "|", "grep'", pid, "|", "grep", "-v", "grep"])
//         .output()
//     {
//         Ok(output) =>
//         {
//             if output.stdout.len() == 0
//             {
//                 return true;
//             }
//         },
//         Err(err) =>
//         {
//             log::warn!("Failed to kill the process: {}", err);     //执行command错误信息
//         },
//     }

//     false
// }



